package com.wudl.realproces

import java.util.Properties

import com.alibaba.fastjson.JSON
import com.wudl.realproces.bean.{ClickLog, Message}
import com.wudl.realproces.utils.GlobalConfigUtil
import org.apache.commons.math.stat.descriptive.rank.Max
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

object App {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置流式时间为EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 设置并行度为1
    env.setParallelism(1)
    //  测试一下
    env.fromCollection(List("hadoop", "hive")).print()

    /**
     * 添加 Checkpoint
     * 保证程序长时间运行的安全性进行checkpoint操作
     */

    // 5秒钟启动一次checkpoint
    env.enableCheckpointing(5000)
    // 设置checkpoint 只 checkpoint 一次
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 设置两次checkpoint 的最小时间间隔
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
    // checkpoint 的超时时间
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    // 允许的最大checkpoint 的并行度
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
    // 当程序关闭时， 触发额外的checkpoint
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    // 设置checkpoint 的地址
    env.setStateBackend(new FsStateBackend("hdfs://node01.com:8020/fink-checkpoint/"))


    /**
     * ***************************flink  整合kafka*******************************
     */
    val properties = new Properties()
    //    # Kafka集群地址
    properties.setProperty("bootstrap.servers", GlobalConfigUtil.bootstrapServers)
    //    # ZooKeeper集群地址
    properties.setProperty("zookeeper.connect", GlobalConfigUtil.zookeeperConnect)
    //    # Kafka Topic名称
    properties.setProperty("input.topic", GlobalConfigUtil.inputTopic)
    //    # 消费组ID
    properties.setProperty("group.id", GlobalConfigUtil.groupId)
    //    # 自动提交拉取到消费端的消息offset到kafka
    properties.setProperty("enable.auto.commit", GlobalConfigUtil.enableAutoCommit)
    //    # 自动提交offset到zookeeper的时间间隔单位（毫秒）
    properties.setProperty("auto.commit.interval.ms", GlobalConfigUtil.autoCommitIntervalMs)
    //    # 每次消费最新的数据
    properties.setProperty("auto.offset.reset", GlobalConfigUtil.autoOffsetReset)

    // 反序列化器   属性集合
    val consumer = new FlinkKafkaConsumer010[String](GlobalConfigUtil.inputTopic, new SimpleStringSchema(), properties)

    val kafkaStream: DataStream[String] = env.addSource(consumer)
    //    kafkaStream.print()
    //  将json 转化为元组

    val tunlpDataStream = kafkaStream.map {
      msgjson =>
        val jsonObject = JSON.parseObject(msgjson)
        val message = jsonObject.getString("message")
        val count = jsonObject.getLong("count")
        val timeStamp = jsonObject.getLong("timeStamp")

        Message(ClickLog(message), count, timeStamp)
    }
    tunlpDataStream.print()

    /**
     * ------------------------------- Flink 添加水印的支持----------------------------------
     */
    val watemarkDataStream = tunlpDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message] {
      // 当前时间搓
      var currentTimeStamp = 0l
      // 延迟时间
      var maxDelayTime = 2000l

      //获取当前的时间戳
      override def getCurrentWatermark: Watermark = {
        new Watermark(currentTimeStamp - maxDelayTime)
      }

      //获取事件的时间
      override def extractTimestamp(element: Message, previousElementTimestamp: Long): Long = {

        currentTimeStamp = Math.max(element.timeStamp, previousElementTimestamp)
        currentTimeStamp
      }
    })

    // 数据的预处理


    env.execute()


  }

}
